package dev;

import com.alibaba.middleware.race.sync.Constants2;
import com.alibaba.middleware.race.sync.Server;
import io.netty.buffer.ByteBuf;
import net.NetworkServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Author :  Rocky
 * Date : 07/06/2017 15:24
 * Description :
 * Test :
 */
public class ServerV1 {

    private static final Logger log = LoggerFactory.getLogger(Server.class);

    //初始化网络组件
    private static NetworkServer networkServer = new NetworkServer(Constants2.SERVER_PORT);

    //insert的时候用
    private static List<Object> insertedColumns = new ArrayList<>(10);

    //update的时候用
    private static Map<String, MapEntry> updatedColumns = new HashMap<>();

    private static ByteBuffer changeRecordBuffer = ByteBuffer.allocate(100 * 1024);

    //存储服务
    private static StoreService storeService;

    public static void main(String[] args) throws IOException {
        if (args == null || args.length == 0) {
            args = new String[]{"middleware", "student", "0", "75896000000000"};
//            args = new String[]{"jiangxin", "pqs", "0", "75896000000000"};
        }

        Constants2.readArgs(args);

        //先加载一个数据文件
        List<MappedByteBuffer> mbbs = Constants2.loadDataFile(1, false);
        MappedByteBuffer dataMbb = mbbs.get(0);

        //读取第一个insert记录，获取一些元信息
        readFirstInsertRecord(dataMbb, changeRecordBuffer, insertedColumns);

        log.info(RecordMeta.getInfo());

        //处理剩下的变更数据
        handleDataFile(dataMbb);

        //处理其他变更文件
        for (int i = 0; i < 9; i++) {
            mbbs = Constants2.loadDataFile(1, false);
            if (mbbs.size() == 0) {
                continue;
            }

            dataMbb = mbbs.get(0);

            handleDataFile(dataMbb);
            Constants2.printMiddleFiles();
        }

        networkServer.flush();
        networkServer.sendEndFlag();

        sendResult();
    }

    private static void handleDataFile(MappedByteBuffer dataMbb) {
        int columnIndex; //0表示：主键列
        while (dataMbb.hasRemaining()) {

            columnIndex = 1;
            changeRecordBuffer.clear();
            insertedColumns.clear();

            discardColumn(dataMbb);
            discardColumn(dataMbb);
            discardColumn(dataMbb);

            //读取schema
            String schema = readColumn(dataMbb, changeRecordBuffer);

            //如果schema不是要处理的数据，则直接过滤，直到下一行
            if (!schema.equals(Constants2.schema)) {
                discardLine(dataMbb);
                continue;
            }

            //读取table
            String table = readColumn(dataMbb, changeRecordBuffer);

            //如果table不是要处理的数据，则直接过滤，直到下一行
            if (!table.equals(Constants2.table)) {
                discardLine(dataMbb);
                continue;
            }

            //读取操作类型：I,U,D
            String opt = readColumn(dataMbb, changeRecordBuffer);

            //丢弃主键描述信息
            discardColumn(dataMbb);

            //如果是insert操作
            if ("I".equals(opt)) {

                //丢弃null
                discardColumn(dataMbb);

                //读取pk的value
                Integer pkNewValue = readIntColumn(dataMbb, changeRecordBuffer);

                while (dataMbb.get() != 10) {

                    //放弃列名信息
                    discardColumn(dataMbb);

                    //放弃null列
                    discardColumn(dataMbb);

                    //计算字段偏移量
                    if (RecordMeta.fieldsDataTypeList.get(columnIndex++).getValue().equals(RecordMeta.NUM_DATA_TYPE)) {
                        int longColumn = readIntColumn(dataMbb, changeRecordBuffer);

                        insertedColumns.add(longColumn);
                    } else {
                        byte[] strBytes = readBytesColumn(dataMbb, changeRecordBuffer);

                        insertedColumns.add(strBytes);
                    }
                }

                //处理insert
                handleInsert(pkNewValue, insertedColumns);
                continue;
            }

            //如果是update操作
            if ("U".equals(opt)) {

                //读取pk，变更前和变更后的值
                int pkOldValue = readIntColumn(dataMbb, changeRecordBuffer);
                int pkNewValue = readIntColumn(dataMbb, changeRecordBuffer);

                updatedColumns.clear();
                byte a;
                while ((a = dataMbb.get()) != 10) {

                    changeRecordBuffer.put(a);
                    //获取变更的列名
                    String columnName = readColumnName(dataMbb, changeRecordBuffer);
                    discardColumn(dataMbb);

                    if (RecordMeta.fieldsDataTypeMap.get(columnName).equals(RecordMeta.NUM_DATA_TYPE)) {
                        //读取变更后的值
                        int oldValue = readIntColumn(dataMbb, changeRecordBuffer);

                        //读取变更后的值
                        int newValue = readIntColumn(dataMbb, changeRecordBuffer);

                        updatedColumns.put(columnName, new MapEntry(oldValue, newValue));
                    } else {

                        //读取变更后的值
                        byte[] oldValue = readBytesColumn(dataMbb, changeRecordBuffer);

                        //读取变更后的值
                        byte[] newValue = readBytesColumn(dataMbb, changeRecordBuffer);

                        updatedColumns.put(columnName, new MapEntry(oldValue, newValue));
                    }
                }

                //执行update
                handleUpdate(pkOldValue, pkNewValue, updatedColumns);
                continue;
            }

            if ("D".equals(opt)) {

                //读取pk的value
//                Long pkNewValue = Long.parseLong(readColumn(dataMbb, changeRecordBuffer));
                Integer pkNewValue = readIntColumn(dataMbb, changeRecordBuffer);

                //放弃整行数据
                discardLine(dataMbb);

                //执行删除
                handleDelete(pkNewValue);
                continue;
            }

        }

        log.info("成功处理一个数据文件");
    }


    private static void readFirstInsertRecord(MappedByteBuffer dataMbb, ByteBuffer dataBuffer, List<Object> data) {

        //找到第一个insert
        while (dataMbb.hasRemaining()) {

            discardColumn(dataMbb);
            discardColumn(dataMbb);
            discardColumn(dataMbb);

            //读取schema
            String schema = readColumn(dataMbb, dataBuffer);

            //如果schema不是要处理的数据，则直接过滤，直到下一行
            if (!schema.equals(Constants2.schema)) {
                discardLine(dataMbb);
                continue;
            }

            //读取table
            String table = readColumn(dataMbb, dataBuffer);

            //如果table不是要处理的数据，则直接过滤，直到下一行
            if (!table.equals(Constants2.table)) {
                discardLine(dataMbb);
                continue;
            }

            //读取操作类型：I,U,D
            String opt = readColumn(dataMbb, dataBuffer);
            if (!"I".equals(opt)) {
                discardLine(dataMbb);
                continue;
            }

            break;
        }

        //读取pk列
        String pkColumnName = readColumnName(dataMbb, dataBuffer);

        //丢弃null
        discardColumn(dataMbb);

        //丢弃null
        discardColumn(dataMbb);

        //读取pk的value
        int pkNewValue = readIntColumn(dataMbb, dataBuffer);

        //记录数据类型
        RecordMeta.addFieldDataType(pkColumnName, RecordMeta.PK_DATA_TYPE);

        //计算字段偏移量
        RecordMeta.sizePerRecord = 1;//一个byte的标志位
        RecordMeta.fieldsOffset.put(pkColumnName, RecordMeta.sizePerRecord);
        RecordMeta.sizePerRecord += 4; //主键的存储空间

        byte a;
        while ((a = dataMbb.get()) != 10) {

            //读取列信息
            dataBuffer.put(a);
            String str = readColumn(dataMbb, dataBuffer);
            String[] columnDesc = str.split(":");
            String columnName = columnDesc[0];
            String columnDataType = columnDesc[1];

            //记录数据类型
            RecordMeta.addFieldDataType(columnName, columnDataType);

            //放弃null列
            discardColumn(dataMbb);

            //计算字段偏移量
            RecordMeta.fieldsOffset.put(columnName, RecordMeta.sizePerRecord);
            if (columnDataType.equals(RecordMeta.NUM_DATA_TYPE)) {
                RecordMeta.sizePerRecord += 3; //如果是数字字段，占用4个byte
                data.add(readIntColumn(dataMbb, dataBuffer));
            } else {
                //|byte(字符串byte数组长度)|6byte(字符串的内容)|
                RecordMeta.sizePerRecord += 7; //如果是字符串字段，占用7个byte。
                data.add(readBytesColumn(dataMbb, dataBuffer));
            }
        }

        RecordMeta.encodeFieldNames();

        //初始化存储服务
        storeService = new StoreService(RecordMeta.sizePerRecord);

        handleInsert(pkNewValue, data);
    }


    private static void handleInsert(int pk, List<Object> columns) {
        storeService.insertRecord(pk, columns);
    }

    private static void handleDelete(int pkNewValue) {
        storeService.deleteRecord(pkNewValue);
    }

    private static void handleUpdate(int pkOldValue, int pkNewValue, Map<String, MapEntry> updatedData) {
        storeService.updateRecord(pkOldValue, pkNewValue, updatedData);
    }


    private static String readColumnName(MappedByteBuffer src, ByteBuffer dest) {
        byte a;
        while ((a = src.get()) != 58) {
            dest.put(a);
        }
        String schema = new String(dest.array(), 0, dest.position());
        dest.clear();
        return schema;
    }

    private static void discardColumn(MappedByteBuffer mbb) {
        while (mbb.get() != 124) {
        }
    }

    private static void discardLine(MappedByteBuffer mbb) {
        while (mbb.get() != 10) {
        }
    }

    private static String readColumn(ByteBuffer src, ByteBuffer dest) {
        //读取schema
        byte a;
        while ((a = src.get()) != 124) {
            dest.put(a);
        }
        String schema = new String(dest.array(), 0, dest.position());
        dest.clear();
        return schema;
    }

    private static byte[] readBytesColumn(ByteBuffer src, ByteBuffer dest) {
        //读取schema
        byte a;
        while ((a = src.get()) != 124) {
            dest.put(a);
        }

        byte[] result = new byte[dest.position()];
        System.arraycopy(dest.array(), 0, result, 0, result.length);
        dest.clear();


        if (result.length > 6) {
            log.error("wrong");
            System.exit(-1);
        }

        return result;
    }

    private static int readIntColumn(ByteBuffer src, ByteBuffer dest) {
        String str = readColumn(src, dest);
        return Integer.parseInt(str);
    }

    private static void sendResult() {
        log.info("发送结果给client");
        ByteBuf resultBuffer = initResultBuffer();

        MappedByteBuffer msgMbb = storeService.getMsgMbb();
        while (msgMbb.hasRemaining()) {

            if (!isValidFlag(msgMbb.get())) {
                msgMbb.position(msgMbb.position() + RecordMeta.sizePerRecord - 1);
                continue;
            }

            //读主键
            int pk = msgMbb.getInt();

            //写主键
            resultBuffer.writeInt(pk);

            for (int i = 1; i < RecordMeta.fieldsDataTypeList.size(); i++) {
                MapEntry entry = RecordMeta.fieldsDataTypeList.get(i);

                if (entry.getValue().equals(RecordMeta.NUM_DATA_TYPE)) {
                    int value = MiscUtils.readMedium(msgMbb);
                    resultBuffer.writeMedium(value);
                } else {

                    int pos = msgMbb.position();

                    byte strByteSize = msgMbb.get();
                    byte[] strBytes = new byte[strByteSize];
                    msgMbb.get(strBytes);
                    msgMbb.position(pos + 7);

                    resultBuffer.writeByte(strByteSize);
                    resultBuffer.writeBytes(strBytes);
                }
            }

            if (resultBuffer.writerIndex() >= Constants2.SEND_BUFFER_SIZE) {
                resultBuffer.setInt(0, resultBuffer.writerIndex());
                networkServer.sendResultData(resultBuffer);
                resultBuffer = initResultBuffer();
            }
        }

        resultBuffer.setInt(0, resultBuffer.writerIndex());
        networkServer.sendResultData(resultBuffer);
        networkServer.sendEndFlag();
    }

    private static ByteBuf initResultBuffer() {
        ByteBuf buf = MiscUtils.directBuffer(Constants2.SEND_BUFFER_SIZE);
        buf.writeInt(4);
        return buf;
    }

    private static boolean isValidFlag(byte flag) {
        byte a = (byte) (flag & Byte.MIN_VALUE);
        return a == Byte.MIN_VALUE;
    }
}
